[AWS IoT] Kinesis Firehoseに連携しLambdaまで繋げてみる – (1)概要とKinesis Firehose以降の構築

[AWS IoT] Kinesis Firehoseに連携しLambdaまで繋げてみる – (1)概要とKinesis Firehose以降の構築

Clock Icon2016.02.12

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

AWS IoTでは受信したメッセージの送信先としてKinesis Firehoseを選択することができます。Kinesis Firehoseでは受信したメッセージをS3やRedshiftに書き込むことができます。今回はAWS IoTで受信したメッセージを、Kinesis Firehose → S3 → Lambdaの順に連携するサンプルを作成しました。図にすると以下のようになります。 aws_iot_kinesis_firehose_lambda_1

AWS IoT〜S3までは先に書いたように各サービスによる連携を、S3からLambdaの連携についてもS3へのオブジェクト配置時にLambdaが起動するように設定しました。

このような構成にした理由としては、以下の様な要件を想定したためです。

  • IoTクライアントから送信されるデータをKinesis Firehoseで受信する
  • 受信したオリジナルデータは加工せずにS3に貯めておきたい
  • Lambdaに連携し、データを加工しRedshiftやDyamoDBに登録したい

サンプルの作成手順(Kinesis Firehose以降)

ではサンプルの作成手順についてです。今回はKinesis Firehose〜Lambdaまでの作成手順について記述します。AWS IoTの作成については次回までお待ち下さい。

0.環境要件

AWS環境の構築にはAWS CLI、LambdaのソースはJava8を使用し、リージョンはオレゴンを選択しました。

1.Lambdaの作成

最初にメッセージを受信するLambdaについてです。S3にファイルを配置時に呼び出されて、そのファイルのパスと内容をログに出力するソースです。

LambdaFunctionHandler.java
package com.classmethod.sample;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.S3Event;
import lombok.val;

public class LambdaFunctionHandler implements RequestHandler<S3Event, Object> {
    InputData inputData = null;
    
    @Override
    public Object handleRequest(S3Event input, Context context) {        
        val logger = context.getLogger();
        val record = input.getRecords().get(0);
        
        logger.log("********** Log Start **********" + "\n");
        logger.log(record.getEventName() + "\n");
        logger.log(record.getS3().getBucket().getName() + "\n");
        logger.log(record.getS3().getObject().getKey() + "\n");
        
        if(inputData == null) inputData = new InputData();
        val lines = inputData.read(record.getS3().getBucket().getName(),
                                   record.getS3().getObject().getKey());
        logger.log("input value = " + "\n");
        logger.log(lines);
        logger.log("\n");
        logger.log("********** Log End **********" + "\n");

        return lines;
    }
}
InputData.java
package com.classmethod.sample;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;

import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.GetObjectRequest;
import lombok.val;
import lombok.SneakyThrows;

public class InputData {
    public String read(String bucketName, String key){
        val s3Client = new AmazonS3Client(new EnvironmentVariableCredentialsProvider());
        val s3Object = s3Client.getObject(new GetObjectRequest(bucketName, key));
        return readS3File(s3Object.getObjectContent());
    }
    
    @SneakyThrows 
    private String readS3File(InputStream input){
        try(val reader = new BufferedReader(new InputStreamReader(input))){
            val lines = new StringBuilder();
            while (true) {
                String line = reader.readLine();
                if (line == null) break;
                lines.append(line + "\n");
            }
            return lines.toString();
        }
    }
}
build.gradle
apply plugin: 'java'

def defaultEncoding = 'UTF-8'
[compileJava, compileTestJava]*.options*.encoding = defaultEncoding

group = 'com.classmethod.sample'
version = '0.0.1-SNAPSHOT'

description = "LambdaS3Read"

sourceCompatibility = 1.8
targetCompatibility = 1.8

repositories {
    jcenter()
}

dependencies {
    compile 'com.amazonaws:aws-lambda-java-core:1.0.0'
    compile 'com.amazonaws:aws-lambda-java-events:1.0.0'
    compile 'org.projectlombok:lombok:1.16.6'

    testCompile 'junit:junit:4.11'
}

jar {
    from configurations.compile.collect { it.isDirectory() ? it : zipTree(it) }
}

このJavaソースをビルドし、作成したjarをLambda Functionとして登録します。Function名は「LambdaS3Read」、ロールのロールポリシーは以下のようにしました(が、Redshiftへの権限は必要ないですね・・・)。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "logs:*"
            ],
            "Resource": "arn:aws:logs:*:*:*"
        },
        {
            "Action": [
                "redshift:*"
            ],
            "Effect": "Allow",
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ReadObject"
            ],
            "Resource": [
                "arn:aws:s3:::*"
            ]
        }
    ]
}

2.S3のバケット作成

次にS3にバケットを作成し、上記のLambdaを呼び出すようにします。先ずは以下のコマンドでバケットを作成します。

$ aws s3 mb s3://t-honda-kinesis-firehose
make_bucket: s3://t-honda-kinesis-firehose/

Lambdaに権限を追加します。

$ aws lambda add-permission --function-name LambdaS3Read --statement-id s3-put-event --action lambda:InvokeFunction --principal s3.amazonaws.com --source-arn arn:aws:s3:::t-honda-kinesis-firehose
{
    "Statement": "{\"Condition\":{\"ArnLike\":{\"AWS:SourceArn\":\"arn:aws:s3:::t-honda-kinesis-firehose\"}},\"Action\":[\"lambda:InvokeFunction\"],\"Resource\":\"arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read\",\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"s3.amazonaws.com\"},\"Sid\":\"s3-put-event\"}"
}

バケットにオブジェクトを配置時、Lambdaを呼び出すようにします。読み出すLambdaのArn等の定義はnotification_configuration.jsonに記述しています。

$ aws s3api put-bucket-notification-configuration --bucket t-honda-kinesis-firehose --notification-configuration file://notification_configuration.json
notification_configuration.json
{"LambdaFunctionConfigurations": [{"LambdaFunctionArn": "arn:aws:lambda:us-west-2:XXXXXXXXXXXX:function:LambdaS3Read", "Events": ["s3:ObjectCreated:Put"]}]}

3.Kinesisの実行ロール作成

Kinesisを実行するロールを作成します。権限の詳細はassume_role_policy.jsonに定義しています。

$ aws iam create-role --role-name t-honda-kinesis-firehose_role --assume-role-policy-document file://assume_role_policy.json
{
    "Role": {
        "AssumeRolePolicyDocument": {
            "Version": "2012-10-17", 
            "Statement": [
                {
                    "Action": "sts:AssumeRole", 
                    "Principal": {
                        "Service": "firehose.amazonaws.com"
                    }, 
                    "Effect": "Allow", 
                    "Sid": ""
                }
            ]
        }, 
        "RoleId": "AROAIDIXI655PEBY5VUYA", 
        "CreateDate": "2016-02-09T22:42:58.017Z", 
        "RoleName": "t-honda-kinesis-firehose_role", 
        "Path": "/", 
        "Arn": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role"
    }
}
assume_role_policy.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "Service": "firehose.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

作成したロールにロールポリシーを付与します。ポリシーの詳細はrole_policy.jsonに定義しています。

aws iam put-role-policy --role-name t-honda-kinesis-firehose_role --policy-name KinesisFirehosePolicy --policy-document file://role_policy.json
role_policy.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Action": [
        "s3:AbortMultipartUpload",
        "s3:GetBucketLocation",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:ListBucketMultipartUploads",
        "s3:PutObject"
      ],
      "Resource": [
        "arn:aws:s3:::t-honda-kinesis-firehose",
        "arn:aws:s3:::t-honda-kinesis-firehose/*"
      ]
    }
  ]
}

4.Kinesis Firehoseの作成

Kinesis Firehoseを作成します。kinesis_input.jsonのRoleARNに先ほど作成したロールのARNと、BucketARNに送信先のバケットを指定していることに注意してください。

$ aws firehose create-delivery-stream --delivery-stream-name t-honda-kinesis-to-s3 --cli-input-json file://kinesis_input.json
{
    "DeliveryStreamARN": "arn:aws:firehose:us-west-2:XXXXXXXXXXXX:deliverystream/t-honda-kinesis-to-s3"
}
kinesis_input.json
{
    "DeliveryStreamName": "", 
    "S3DestinationConfiguration": {
        "RoleARN": "arn:aws:iam::XXXXXXXXXXXX:role/t-honda-kinesis-firehose_role", 
        "BucketARN": "arn:aws:s3:::t-honda-kinesis-firehose", 
        "Prefix": "", 
        "BufferingHints": {
            "SizeInMBs": 1, 
            "IntervalInSeconds": 60
        }, 
        "CompressionFormat": "UNCOMPRESSED", 
        "EncryptionConfiguration": {
            "NoEncryptionConfig": "NoEncryption"
        }
    }
}

まとめ

ここまででKinesis Firehoseで受信したメッセージをS3に配置し、Lambdaを起動するまでを構築することができました。後はクライアントからメッセージを受信するAWS IoTの設定を行うだけです。これについては、次回に記述します。

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.